package tableapi.udf

import bean.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

/**
  * @Description: TODO QQ1667847363
  * @author: xiao kun tai
  * @date:2021/11/28 23:10
  */
object Test1_ScalaFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //file source
    val inputPath: String = "src/main/resources/sensor.txt"
    val fileStream: DataStream[String] = env.readTextFile(inputPath)

    val socketStream = env.socketTextStream("192.168.88.106", 7777)

    //先转换为特定的类型
    val dataStream: DataStream[SensorReading] = fileStream.map(data => {
      val arr = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L
      })

    val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    //创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env, settings)

    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)

    //调用自定义hash函数，对id进行hash运算
    //table api
    //首先new 1个UDF的实例
    val hashCode = new HashCode(23)
    val resultTable = sensorTable
      .select('id,'ts,hashCode('id))

    resultTable.toAppendStream[Row].print("result")
    //sql
    //需要在环境中注册UDF
    tableEnv.createTemporaryView("sensor",sensorTable)
    tableEnv.registerFunction("hashCode",hashCode)

    val resultSqlTable =tableEnv.sqlQuery(
      """
        |select id,
        |ts,
        |hashCode(id) from sensor
      """.stripMargin)

    resultSqlTable.toAppendStream[Row].print("sql")


    env.execute("udf test")


  }

  class HashCode(factor: Int) extends ScalarFunction {
    def eval(s: String): Int = {
      s.hashCode * factor - 10000
    }
  }

}
